package boot.spring.elastic.service.impl;

import boot.spring.elastic.service.IndexService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.*;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;



@Slf4j
public class IndexServiceImpl implements IndexService {

	private RestHighLevelClient client;

	public IndexServiceImpl(RestHighLevelClient client){
		this.client = client;
	}

	@Override
	public boolean deleteIndex(String indexName) {
		log.info("deleteIndex——删除一个索引是, indexName:[ {} ] ", indexName  );
		try {
			DeleteIndexRequest request = new DeleteIndexRequest(indexName);
			client.indices().delete(request, RequestOptions.DEFAULT);
			return true;
		} catch (IOException exception) {
		} catch (ElasticsearchException exception) {
			if (exception.status() == RestStatus.NOT_FOUND) {
				log.warn("deleteIndex——删除一个索引，索引不存在, indexName:[ {} ] ", indexName  );
			}
		}
		return false;
	}

	@Override
	public boolean existIndex(String indexName) {

		log.info("existIndex——判断一个索引是否存在, indexName:[ {} ] ", indexName  );

		GetIndexRequest request = new GetIndexRequest(indexName);
		try {
			boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
			return exists;
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public void createMapping(String indexName, XContentBuilder mapping) {
		log.info("createMapping_创建索引映射开始, indexName:[ {} ], mapping:[ {} ]", indexName , mapping);
		try {
			CreateIndexRequest index = new CreateIndexRequest(indexName);
			index.source(mapping);
			client.indices().create(index, RequestOptions.DEFAULT);
			log.info("createMapping_创建索引映射成功！！！, indexName:[ {} ], mapping:[ {} ]", indexName , mapping);

		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void indexDoc(String indexName, String indexType, Map<String, Object> doc) {
		log.info("indexDoc_索引一篇文档, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName , indexType, doc);

		IndexRequest indexRequest = new IndexRequest(indexName, indexType, (String)doc.get("key")).source(doc);
		try {
		    IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);

			log.info("indexDoc_索引一篇文档成功！！！, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName , indexType, response);

		} catch(ElasticsearchException e ) {
		    if (e.status() == RestStatus.CONFLICT) {
		    	System.out.println("写入索引产生冲突"+e.getDetailedMessage());
		    }
		} catch(IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void indexDocWithRouting(String indexName, String indexType, String route, Map<String, Object> doc) {
		log.info("indexDocWithRouting_带路由索引一篇文档, indexName:[ {} ], indexType:[ {} ], route:[ {} ], doc:[ {} ]", indexName , indexType, route, doc);
		IndexRequest indexRequest = new IndexRequest(indexName, indexType, (String)doc.get("key")).source(doc);
		indexRequest.routing(route);
		try {
			IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);

			log.info("indexDocWithRouting_带路由索引一篇文档成功！！！, indexName:[ {} ], indexType:[ {} ], route:[ {} ], doc:[ {} ]", indexName , indexType, route, doc);

		} catch(ElasticsearchException e ) {
			if (e.status() == RestStatus.CONFLICT) {
				System.out.println("写入索引产生冲突"+e.getDetailedMessage());
			}
		} catch(IOException e) {
			e.printStackTrace();
		}
	}
	

	@Override
	public void indexDocs(String indexName, String indexType, List<Map<String, Object>> docs) {
		log.info("indexDocs_索引一组文档, indexName:[ {} ], indexType:[ {} ], docs:[ {} ]", indexName , indexType, docs);

		try {
	            if (null == docs || docs.size() <= 0) {
	                return;
	            }
	            BulkRequest request = new BulkRequest();
	            for (Map<String, Object> doc : docs) {
	                request.add(new IndexRequest(indexName, indexType, (String)doc.get("key"))
	                            .source(doc));
	            }
	            BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
	            if (bulkResponse != null) {
	                for (BulkItemResponse bulkItemResponse : bulkResponse) {
	                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

	                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
	                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
	                        IndexResponse indexResponse = (IndexResponse) itemResponse;
							System.out.println("indexDocs_索引一组文档_新增成功: " + indexResponse.toString());

							log.info("indexDocs_索引一组文档_新增成功！！！, indexName:[ {} ], indexType:[ {} ], indexResponse:[ {} ]", indexName , indexType, indexResponse);

						} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
	                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
							System.out.println("indexDocs_索引一组文档_修改成功: " + updateResponse.toString());
							log.info("indexDocs_索引一组文档_修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , indexType, updateResponse);

						} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
	                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
	                        System.out.println("删除成功:" + deleteResponse.toString());
							log.info("indexDocs_索引一组文档_删除成功！！！, indexName:[ {} ], indexType:[ {} ], deleteResponse:[ {} ]", indexName , indexType, deleteResponse);

						}
	                }
	            }

	        } catch (IOException e) {
	            e.printStackTrace();
	        }
	}


	@Override
	public void indexDocsWithRouting(String indexName, String indexType, List<Map<String, Object>> docs) {

		log.info("indexDocsWithRouting_带路由索引一组文档, indexName:[ {} ], indexType:[ {} ], docs:[ {} ]", indexName , indexType, docs);

		try {
			if (null == docs || docs.size() <= 0) {
				return;
			}
			BulkRequest request = new BulkRequest();
			for (Map<String, Object> doc : docs) {
				HashMap<String, Object> join = (HashMap<String, Object>)doc.get("joinkey");
				String route = (String)join.get("parent");
				request.add(new IndexRequest(indexName, indexType, (String)doc.get("key"))
						.source(doc).routing(route));
			}
			BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
			if (bulkResponse != null) {
				for (BulkItemResponse bulkItemResponse : bulkResponse) {
					DocWriteResponse itemResponse = bulkItemResponse.getResponse();

					if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
							|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
						IndexResponse indexResponse = (IndexResponse) itemResponse;
						System.out.println("新增成功" + indexResponse.toString());
						log.info("indexDocsWithRouting_带路由索引一组文档——新增成功！！！, indexName:[ {} ], indexType:[ {} ], indexResponse:[ {} ]", indexName , indexResponse);

					} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
						UpdateResponse updateResponse = (UpdateResponse) itemResponse;
						System.out.println("修改成功" + updateResponse.toString());
						log.info("indexDocsWithRouting_带路由索引一组文档——修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , updateResponse);
					} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
						DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
						System.out.println("删除成功" + deleteResponse.toString());
						log.info("indexDocsWithRouting_带路由索引一组文档——删除成功！！！, indexName:[ {} ], indexType:[ {} ], deleteResponse:[ {} ]", indexName , deleteResponse);
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public int deleteDoc(String indexName, String indexType, String id) {

		log.info("deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], id:[ {} ]", indexName , id);

		DeleteResponse deleteResponse = null;
		DeleteRequest request = new DeleteRequest(indexName, indexType, id);
		try {
			deleteResponse = client.delete(request, RequestOptions.DEFAULT);
			System.out.println("删除成功" + deleteResponse.toString());
			log.info("deleteDoc——删除一篇文档--删除成功！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]", indexName , id, deleteResponse);


			if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
				System.out.println("删除失败，文档不存在" + deleteResponse.toString());
				log.error("deleteDoc——删除一篇文档--删除失败，文档不存在！！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]", indexName , id, deleteResponse);
				return -1;
			}
		} catch (ElasticsearchException e) {
			if (e.status() == RestStatus.CONFLICT) {
				System.out.println("删除失败，版本号冲突" + deleteResponse.toString());
				log.error("deleteDoc——删除一篇文档--删除失败，删除失败，版本号冲突！！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]", indexName , id, deleteResponse);

				return -2;
		    }
		} catch (IOException e) {
			e.printStackTrace();
			return -3;
		}
		//

		return 1;
	}

	@Override
	public int deleteDoc(String indexName, String indexType, String filedName, Object value) {
		log.info("deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ]", indexName , filedName, JSON.toJSONString(value));

		BulkByScrollResponse deleteResponse = null;
		try {
			DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
			request.setDocTypes(indexType);
			request.setQuery(new TermQueryBuilder(filedName, value));
			request.setAbortOnVersionConflict(false);

			BulkByScrollResponse resp = client.deleteByQuery(request, RequestOptions.DEFAULT);
			long deleted = resp.getDeleted();
			log.info("deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ]，删除【 {} 】条文档", indexName , filedName, JSON.toJSONString(value), deleted);
		} catch (ElasticsearchException e) {
			if (e.status() == RestStatus.CONFLICT) {
				log.error("deleteDoc——删除一篇文档失败ElasticsearchException, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ], deleteResponse:[ {} ]", indexName , filedName, JSON.toJSONString(value), JSON.toJSONString(deleteResponse));
				return -2;
			}
		} catch (IOException e) {
			e.printStackTrace();
			log.error("deleteDoc——删除一篇文档失败IOException, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ], deleteResponse:[ {} ]", indexName , filedName, JSON.toJSONString(value), JSON.toJSONString(deleteResponse));
			return -3;
		}
		return 1;
	}

	@Override
	public void updateDoc(String indexName, String indexType, Map<String, Object> doc) {

		log.info("updateDoc——修改一篇文档, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName , doc);

		UpdateRequest request = new UpdateRequest(indexName, indexType, (String) doc.get("key")).doc(doc);
		request.docAsUpsert(true);
		try {
			UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
			long version = updateResponse.getVersion();

			if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
				System.out.println("insert success, version is " + version);
				log.info("updateDoc——修改一篇文档--新增成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , updateResponse);


			} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
				System.out.println("update success, version is " + version);
				log.info("updateDoc——修改一篇文档--修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , updateResponse);
			} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
				log.info("updateDoc——修改一篇文档--删除成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , updateResponse);

			} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
				log.warn("updateDoc——修改一篇文档--无操作！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]", indexName , updateResponse);

			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}


}
